Skip to content

feat(sse): wire @defer incremental delivery into Absinthe.Plug#2

Open
juscyllan wants to merge 2 commits into
gigsmart:gigmart/defer-stream-incrementalfrom
juscyllan:fix/sse-incremental-delivery-wiring
Open

feat(sse): wire @defer incremental delivery into Absinthe.Plug#2
juscyllan wants to merge 2 commits into
gigsmart:gigmart/defer-stream-incrementalfrom
juscyllan:fix/sse-incremental-delivery-wiring

Conversation

@juscyllan
Copy link
Copy Markdown

@juscyllan juscyllan commented Apr 11, 2026

Summary

  • Wires SSE incremental delivery into the main Absinthe.Plug request path
  • Queries with @defer + Accept: text/event-stream now return chunked SSE events
  • Queries without @defer continue to return standard JSON

What changed

lib/absinthe/plug.ex (115 insertions, 4 deletions)

  1. call/2 — new case for {:ok, :streaming} (already-sent SSE response)
  2. run_query/4 — detects incremental_delivery: true + SSE Accept header → routes to deliver_incremental_sse
  3. deliver_incremental_sse/2 (new) — splits resolved result into initial/incremental payloads using defer_info from streaming context, sends as SSE events:
    • event: next — initial data (deferred fields stripped) + pending + hasNext: true
    • event: next — incremental data (deferred fields) + hasNext: false
    • event: complete
  4. Clears before_send callbacks before send_chunked to prevent ETag plug crash on nil resp_body

Test plan

  • @defer query: initial event has name only, incremental has fields array
  • Normal query: returns JSON as before
  • SSE without @defer: returns JSON (no streaming)
  • No crash from ETag/before_send hooks

Depends on


Note

Medium Risk
Adds chunked SSE response handling for @defer queries and tweaks subscription SSE headers/keep-alive behavior, which can affect HTTP response semantics and intermediaries (proxies/caches). Core GraphQL execution remains the same for non-incremental requests.

Overview
Wires @defer incremental delivery into Absinthe.Plug: when a pipeline run reports incremental_delivery: true and the request Accepts SSE, the plug now switches to a chunked text/event-stream response and returns {:ok, :streaming} to avoid double-sending.

Adds deliver_incremental_sse/2 to emit an initial next event with deferred fields stripped plus pending/hasNext, followed by next events for each deferred payload and a final complete event; it also clears before_send callbacks before send_chunked/200 to avoid issues with plugs like ETag.

Subscription SSE setup is simplified to inline headers + send_chunked/200, and the keep-alive comment payload changes from : keep-alive to :ping.

Reviewed by Cursor Bugbot for commit dd6f8ff. Bugbot is set up for automated code reviews on this repo. Configure here.

Enable @defer SSE transport in the main plug request path:

- run_query: detect incremental_delivery flag + SSE Accept header,
  route to deliver_incremental_sse instead of standard JSON response
- deliver_incremental_sse: split resolved result into initial (without
  deferred fields) and incremental payloads, send as SSE events
  (next/initial, next/incremental, complete)
- call: handle {:ok, :streaming} result for already-sent SSE responses
- Clear before_send callbacks (ETag etc.) before send_chunked to prevent
  crash on nil resp_body

Queries without @defer continue to return standard JSON responses.
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 4 potential issues.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit dd6f8ff. Configure here.

Comment thread lib/absinthe/plug.ex
|> Plug.Conn.put_resp_header("content-type", "text/event-stream")
|> Plug.Conn.put_resp_header("cache-control", "no-cache")
|> Plug.Conn.put_resp_header("x-accel-buffering", "no")
|> Map.update(:private, %{}, &Map.put(&1, :before_send, []))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clearing before_send on wrong conn field

High Severity

The code clears before_send inside conn.private, but Plug.Conn stores before_send callbacks as a top-level struct field at conn.before_send. The send_chunked/2 function calls run_before_send, which reads from conn.before_send, not conn.private.before_send. This means the ETag/before_send crash described in the comment is not actually prevented — the callbacks will still fire with resp_body: nil during send_chunked.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit dd6f8ff. Configure here.

Comment thread lib/absinthe/plug.ex Outdated

# Send initial event
initial_event = EventFormatter.format_event("next", initial_response, 0)
{:ok, conn} = Plug.Conn.chunk(conn, initial_event)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hard pattern match on chunk crashes on disconnect

Medium Severity

The initial chunk uses a hard pattern match {:ok, conn} = Plug.Conn.chunk(...) which raises MatchError if the client has disconnected and chunk returns {:error, reason}. Later chunk calls in the same function properly handle errors with case statements, making this inconsistency a crash risk under normal disconnect conditions.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit dd6f8ff. Configure here.

Comment thread lib/absinthe/plug.ex
{conn, {:ok, :streaming}}
else
{conn, {:ok, bp.result}}
end
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SSE triggers for wildcard Accept breaking standard clients

High Severity

accepts_sse? returns true for Accept: */*, which is the default header for most HTTP clients (curl, fetch, Postman, etc.). This means nearly all @defer queries through Absinthe.Plug will get SSE chunked responses instead of standard JSON, contradicting the PR description which states SSE is for Accept: text/event-stream specifically. Standard GraphQL clients expecting JSON will receive an unexpected text/event-stream response.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit dd6f8ff. Configure here.

Comment thread lib/absinthe/plug.ex Outdated
conn
|> ConnectionManager.setup_sse_headers()
|> put_resp_header("content-type", "text/event-stream")
|> send_chunked(200)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Subscribe function drops critical SSE response headers

Medium Severity

Replacing ConnectionManager.setup_sse_headers() with only put_resp_header("content-type", "text/event-stream") drops five headers that setup_sse_headers previously set: cache-control: no-cache, connection: keep-alive, x-accel-buffering: no, and two CORS headers. Missing cache-control allows proxies to cache SSE events, and missing x-accel-buffering causes nginx to buffer the response, preventing real-time delivery of subscription events.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit dd6f8ff. Configure here.

@cursor
Copy link
Copy Markdown

cursor Bot commented Apr 11, 2026

You have used all of your free Bugbot PR reviews.

To receive reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

1 participant